lib/repo-pull: Support queuing delta superblock requests
authorPhilip Withnall <withnall@endlessm.com>
Mon, 28 May 2018 15:55:05 +0000 (16:55 +0100)
committerAtomic Bot <atomic-devel@projectatomic.io>
Wed, 30 May 2018 19:57:13 +0000 (19:57 +0000)
Just like all the other requests made for delta parts and objects by the
pull code, use a queue for delta superblocks. Currently this doesn’t do
any prioritisation or retries after transient failures, but it could do
in future.

This means that delta superblocks are now subject to the parallel
request limit in the fetcher, which was a problem highlighted here:
https://github.com/ostreedev/ostree/pull/1453#discussion_r168321706.

Signed-off-by: Philip Withnall <withnall@endlessm.com>
Closes: #1600
Approved by: jlebon

src/libostree/ostree-repo-pull.c

index 83ccff5ce7e2c64a034fc7dbc8b1187fa5462674..7d05c10f282a2da22c6a55a61e1eee3921b3eb68 100644 (file)
@@ -127,6 +127,7 @@ typedef struct {
   GHashTable       *requested_fallback_content; /* Maps checksum to itself */
   GHashTable       *pending_fetch_metadata; /* Map<ObjectName,FetchObjectData> */
   GHashTable       *pending_fetch_content; /* Map<checksum,FetchObjectData> */
+  GHashTable       *pending_fetch_delta_superblocks; /* Set<FetchDeltaSuperData> */
   GHashTable       *pending_fetch_deltaparts; /* Set<FetchStaticDeltaData> */
   guint             n_outstanding_metadata_fetches;
   guint             n_outstanding_metadata_write_requests;
@@ -206,6 +207,13 @@ typedef struct {
   OstreeCollectionRef *requested_ref;  /* (nullable) */
 } ScanObjectQueueData;
 
+typedef struct {
+  OtPullData *pull_data;
+  char *from_revision;
+  char *to_revision;
+  OstreeCollectionRef *requested_ref;  /* (nullable) */
+} FetchDeltaSuperData;
+
 static void
 variant_or_null_unref (gpointer data)
 {
@@ -216,6 +224,8 @@ variant_or_null_unref (gpointer data)
 static void start_fetch (OtPullData *pull_data, FetchObjectData *fetch);
 static void start_fetch_deltapart (OtPullData *pull_data,
                                    FetchStaticDeltaData *fetch);
+static void start_fetch_delta_superblock (OtPullData          *pull_data,
+                                          FetchDeltaSuperData *fetch_data);
 static gboolean fetcher_queue_is_full (OtPullData *pull_data);
 static void queue_scan_one_metadata_object (OtPullData                *pull_data,
                                             const char                *csum,
@@ -235,6 +245,8 @@ static void queue_scan_one_metadata_object_c (OtPullData                *pull_da
 
 static void enqueue_one_object_request_s (OtPullData      *pull_data,
                                           FetchObjectData *fetch_data);
+static void enqueue_one_static_delta_superblock_request_s (OtPullData          *pull_data,
+                                                           FetchDeltaSuperData *fetch_data);
 static void enqueue_one_static_delta_part_request_s (OtPullData           *pull_data,
                                                      FetchStaticDeltaData *fetch_data);
 
@@ -391,6 +403,7 @@ check_outstanding_requests_handle_error (OtPullData          *pull_data,
       g_queue_foreach (&pull_data->scan_object_queue, (GFunc) scan_object_queue_data_free, NULL);
       g_queue_clear (&pull_data->scan_object_queue);
       g_hash_table_remove_all (pull_data->pending_fetch_metadata);
+      g_hash_table_remove_all (pull_data->pending_fetch_delta_superblocks);
       g_hash_table_remove_all (pull_data->pending_fetch_deltaparts);
       g_hash_table_remove_all (pull_data->pending_fetch_content);
     }
@@ -423,6 +436,16 @@ check_outstanding_requests_handle_error (OtPullData          *pull_data,
           g_variant_unref (objname);
         }
 
+      /* Next, process delta superblock requests */
+      g_hash_table_iter_init (&hiter, pull_data->pending_fetch_delta_superblocks);
+      while (!fetcher_queue_is_full (pull_data) &&
+             g_hash_table_iter_next (&hiter, &key, &value))
+        {
+          FetchDeltaSuperData *fetch = key;
+          g_hash_table_iter_steal (&hiter);
+          start_fetch_delta_superblock (pull_data, g_steal_pointer (&fetch));
+        }
+
       /* Now, process deltapart requests */
       g_hash_table_iter_init (&hiter, pull_data->pending_fetch_deltaparts);
       while (!fetcher_queue_is_full (pull_data) &&
@@ -2648,13 +2671,6 @@ get_best_static_delta_start_for (OtPullData *pull_data,
   return TRUE;
 }
 
-typedef struct {
-  OtPullData *pull_data;
-  char *from_revision;
-  char *to_revision;
-  OstreeCollectionRef *requested_ref;  /* (nullable) */
-} FetchDeltaSuperData;
-
 static void
 fetch_delta_super_data_free (FetchDeltaSuperData *fetch_data)
 {
@@ -2757,6 +2773,59 @@ on_superblock_fetched (GObject   *src,
   g_clear_pointer (&fetch_data, fetch_delta_super_data_free);
 }
 
+static void
+start_fetch_delta_superblock (OtPullData          *pull_data,
+                              FetchDeltaSuperData *fetch_data)
+{
+  g_autofree char *delta_name =
+    _ostree_get_relative_static_delta_superblock_path (fetch_data->from_revision,
+                                                       fetch_data->to_revision);
+  _ostree_fetcher_request_to_membuf (pull_data->fetcher,
+                                     pull_data->content_mirrorlist,
+                                     delta_name, OSTREE_FETCHER_REQUEST_OPTIONAL_CONTENT,
+                                     OSTREE_MAX_METADATA_SIZE,
+                                     0, pull_data->cancellable,
+                                     on_superblock_fetched,
+                                     g_steal_pointer (&fetch_data));
+  pull_data->n_outstanding_metadata_fetches++;
+  pull_data->n_requested_metadata++;
+}
+
+static void
+enqueue_one_static_delta_superblock_request_s (OtPullData          *pull_data,
+                                               FetchDeltaSuperData *fetch_data)
+{
+  if (fetcher_queue_is_full (pull_data))
+    {
+      g_debug ("queuing fetch of static delta superblock %s-%s",
+               fetch_data->from_revision ?: "empty",
+               fetch_data->to_revision);
+
+      g_hash_table_add (pull_data->pending_fetch_delta_superblocks,
+                        g_steal_pointer (&fetch_data));
+    }
+  else
+    {
+      start_fetch_delta_superblock (pull_data, g_steal_pointer (&fetch_data));
+    }
+}
+
+/* Start a request for a static delta */
+static void
+enqueue_one_static_delta_superblock_request (OtPullData                *pull_data,
+                                             const char                *from_revision,
+                                             const char                *to_revision,
+                                             const OstreeCollectionRef *ref)
+{
+  FetchDeltaSuperData *fdata = g_new0(FetchDeltaSuperData, 1);
+  fdata->pull_data = pull_data;
+  fdata->from_revision = g_strdup (from_revision);
+  fdata->to_revision = g_strdup (to_revision);
+  fdata->requested_ref = (ref != NULL) ? ostree_collection_ref_dup (ref) : NULL;
+
+  enqueue_one_static_delta_superblock_request_s (pull_data, g_steal_pointer (&fdata));
+}
+
 static gboolean
 validate_variant_is_csum (GVariant       *csum,
                           GError        **error)
@@ -3260,31 +3329,6 @@ reinitialize_fetcher (OtPullData *pull_data, const char *remote_name,
   return TRUE;
 }
 
-/* Start a request for a static delta */
-static void
-initiate_delta_request (OtPullData *pull_data,
-                        const char *from_revision,
-                        const char *to_revision,
-                        const OstreeCollectionRef *ref)
-{
-  g_autofree char *delta_name =
-    _ostree_get_relative_static_delta_superblock_path (from_revision, to_revision);
-  FetchDeltaSuperData *fdata = g_new0(FetchDeltaSuperData, 1);
-  fdata->pull_data = pull_data;
-  fdata->from_revision = g_strdup (from_revision);
-  fdata->to_revision = g_strdup (to_revision);
-  fdata->requested_ref = (ref != NULL) ? ostree_collection_ref_dup (ref) : NULL;
-
-  _ostree_fetcher_request_to_membuf (pull_data->fetcher,
-                                     pull_data->content_mirrorlist,
-                                     delta_name, OSTREE_FETCHER_REQUEST_OPTIONAL_CONTENT,
-                                     OSTREE_MAX_METADATA_SIZE,
-                                     0, pull_data->cancellable,
-                                     on_superblock_fetched, fdata);
-  pull_data->n_outstanding_metadata_fetches++;
-  pull_data->n_requested_metadata++;
-}
-
 /*
  * initiate_request:
  * @ref: Optional ref name and collection ID
@@ -3335,10 +3379,10 @@ initiate_request (OtPullData                 *pull_data,
           }
           break;
         case DELTA_SEARCH_RESULT_FROM:
-          initiate_delta_request (pull_data, deltares.from_revision, to_revision, ref);
+          enqueue_one_static_delta_superblock_request (pull_data, deltares.from_revision, to_revision, ref);
           break;
         case DELTA_SEARCH_RESULT_SCRATCH:
-          initiate_delta_request (pull_data, NULL, to_revision, ref);
+          enqueue_one_static_delta_superblock_request (pull_data, NULL, to_revision, ref);
           break;
         case DELTA_SEARCH_RESULT_UNCHANGED:
           {
@@ -3390,14 +3434,14 @@ initiate_request (OtPullData                 *pull_data,
       if (delta_from_revision && g_str_equal (delta_from_revision, to_revision))
         queue_scan_one_metadata_object (pull_data, to_revision, OSTREE_OBJECT_TYPE_COMMIT, NULL, 0, ref);
       else
-        initiate_delta_request (pull_data, delta_from_revision ?: NULL, to_revision, ref);
+        enqueue_one_static_delta_superblock_request (pull_data, delta_from_revision ?: NULL, to_revision, ref);
     }
   else
     {
       /* Legacy path without a summary file - let's try a scratch delta, if that
        * doesn't work, it'll drop down to object requests.
        */
-      initiate_delta_request (pull_data, NULL, to_revision, NULL);
+      enqueue_one_static_delta_superblock_request (pull_data, NULL, to_revision, NULL);
     }
 
   return TRUE;
@@ -3597,6 +3641,7 @@ ostree_repo_pull_with_options (OstreeRepo             *self,
   pull_data->pending_fetch_metadata = g_hash_table_new_full (ostree_hash_object_name, g_variant_equal,
                                                              (GDestroyNotify)g_variant_unref,
                                                              (GDestroyNotify)fetch_object_data_free);
+  pull_data->pending_fetch_delta_superblocks = g_hash_table_new_full (NULL, NULL, (GDestroyNotify) fetch_delta_super_data_free, NULL);
   pull_data->pending_fetch_deltaparts = g_hash_table_new_full (NULL, NULL, (GDestroyNotify)fetch_static_delta_data_free, NULL);
 
   if (opt_localcache_repos && *opt_localcache_repos)
@@ -4560,6 +4605,7 @@ ostree_repo_pull_with_options (OstreeRepo             *self,
   g_clear_pointer (&pull_data->requested_metadata, (GDestroyNotify) g_hash_table_unref);
   g_clear_pointer (&pull_data->pending_fetch_content, (GDestroyNotify) g_hash_table_unref);
   g_clear_pointer (&pull_data->pending_fetch_metadata, (GDestroyNotify) g_hash_table_unref);
+  g_clear_pointer (&pull_data->pending_fetch_delta_superblocks, (GDestroyNotify) g_hash_table_unref);
   g_clear_pointer (&pull_data->pending_fetch_deltaparts, (GDestroyNotify) g_hash_table_unref);
   g_queue_foreach (&pull_data->scan_object_queue, (GFunc) scan_object_queue_data_free, NULL);
   g_queue_clear (&pull_data->scan_object_queue);